using System.Text;
using Lemon.BackgroundJobs.Abstractions;
using Lemon.BackgroundJobs.Abstractions.WorkQueues;
using RabbitMQ.Client.Events;

namespace Lemon.BackgroundJobs.RabbitMQ.WorkQueues;

/// <summary>
/// 竞争消费者模式
/// </summary>
public class WorkQueues : IWorkQueues
{
    private readonly IRabbitMQClient _rabbitMqClient;
    public WorkQueues(IRabbitMQClient rabbitMqClient)
    {
        _rabbitMqClient = rabbitMqClient;
    }

    /// <summary>
    /// 发布消息(竞争消费者模式)
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="msg"></param>
    /// <param name="queue"></param>
    public void Publish<T>(T msg, string queue) where T : class, new()
    {
        using (var channel = _rabbitMqClient.GetConnection().CreateModel())
        {
            channel.QueueDeclare(queue: queue,
                durable: true,
                exclusive: false,
                autoDelete: false,
                arguments: null);

            var message = Newtonsoft.Json.JsonConvert.SerializeObject(msg);
            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            channel.BasicPublish(exchange: "",
                routingKey: queue,
                false,
                basicProperties: properties,
                body: body);
        }
    }

    /// <summary>
    /// 工作者订阅(竞争消费者模式)
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="queue"></param>
    /// <param name="handler"></param>
    public void Subscribe<T>(string queue, Action<T> handler) where T : class, new()
    {
        var channel = _rabbitMqClient.GetConnection().CreateModel();
        channel.QueueDeclare(queue: queue,
            durable: true,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            var message = Encoding.UTF8.GetString(body);
            T? msg = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message);

            if (msg == null)
            {
                throw new Exception("message is null");
            }
            handler(msg);
            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
        };
        channel.BasicConsume(queue, false, "", false, false, null, consumer);
    }
}